 1.Flink的State--状态原理及原理剖析之状态类型
   
   State:用来保存计算结果或缓存数据。
   sum
   Flink根据是否需要保存中间结果，把计算分为有状态计算和无状态计算
   有状态计算：依赖之前或之后的事件
   无状态计算：独立
   根据数据结构不同，Flink定义了多种state，应用于不同的场景
       ValueState：即类型为T的单值状态。这个状态与对应的key绑定，是最简单的状态了。它可以通
过 update 方法更新状态值，通过 value() 方法获取状态值。
       ListState：即key上的状态值为一个列表。可以通过 add 方法往列表中附加值；也可以通过 get()
方法返回一个 Iterable<T> 来遍历状态值。
       ReducingState：这种状态通过用户传入的reduceFunction，每次调用 add 方法添加值的时候，
会调用reduceFunction，最后合并到一个单一的状态值。
       FoldingState：跟ReducingState有点类似，不过它的状态值类型可以与 add 方法中传入的元素类
型不同（这种状态将会在Flink未来版本中被删除）。
       MapState：即状态值为一个map。用户通过 put 或 putAll 方法添加元素
   State按照是否有key划分为KeyedState和OperatorState
   Keyed State:KeyedStream流上的每一个Key都对应一个State
   案例：利用state求平均值
   原始数据：（1,3）（1,5）（1,7）（1,4）（1,2）
   思路：
   1、 读数据源
   2、 将数据源根据key分组
   3、 按照key分组策略，对流式数据调用状态化处理
     在处理过程中：
     a、实例化出一个状态实例
<T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties);
ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>
(
            "average",
            TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
           })
           , Tuple2.of(0L, 0L)
       );
getRuntimeContext().getState(descriptor);
/**
* Creates a new {@code ValueStateDescriptor} with the given name and
default value.
*
* @deprecated Use {@link #ValueStateDescriptor(String, TypeInformation)}
instead and manually
* manage the default value by checking whether the contents of the state is
{@code null}.
*
* @param name The (unique) name for the state.
* @param typeInfo The type of the values in the state.
* @param defaultValue The default value that will be set when requesting
state without setting
*           a value before.
*/
	@Deprecated
	public ValueStateDescriptor(String name, TypeInformation<T> typeInfo, T
defaultValue) {
		super(name, typeInfo, defaultValue);
	}
	 b、随着流式数据的到来，更新状态
	 sum.update(currentSum);
     void update(T value) throws IOException;
   4、 输出计算结果
   keyed State:代码：
package com.lagou.state;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StateDemo {
    public static void main(String[] args) throws Exception {
        // （1,3）（1,5）（1,7）（1,4）（1,2）
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Tuple2<Long, Long>> data = env.fromElements(
                new Tuple2(1l, 3l), new Tuple2(1l, 5l),
                new Tuple2(1l, 7l), new Tuple2(1l, 4l),
                new Tuple2(1l, 2l));
        KeyedStream<Tuple2<Long, Long>, Long> keyed = data.keyBy(value -> value.f0);
        
        //按照key分组策略，对流式数据调用状态化处理
        SingleOutputStreamOperator<Tuple2<Long, Long>> flatMaped = keyed.flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
            ValueState<Tuple2<Long, Long>> sumState;

            @Override
            public void open(Configuration parameters) throws Exception {
                // 在open方法中做出state
                ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
                        "average",
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
                        }),
                        Tuple2.of(0L, 0L)
                );
                sumState = getRuntimeContext().getState(descriptor);
                super.open(parameters);
            }

            @Override
            public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
                // 在flatMap方法中更新state
                Tuple2<Long, Long> currentSum = sumState.value();
                currentSum.f0 += 1;
                currentSum.f1 += value.f1;

                sumState.update(currentSum);

                if (currentSum.f0 == 5) {
                    long avarage = currentSum.f1 / currentSum.f0;
                    out.collect(new Tuple2<>(value.f0, avarage));
                    sumState.clear();
                }

            }
        });

        flatMaped.print();
        env.execute();
    }
}
      
   Operator State代码：
   ListCheckPointed
   CheckPointedFunction
   见状态存储
   1).Keyed State
   表示和Key相关的一种State，只能用于KeydStream类型数据集对应的Functions和 Operators之上。
Keyed State是 Operator State的特例，区别在于 Keyed State 事先按照key对数据集进行了分区，每个
Key State 仅对应ー个 Operator和Key的组合。Keyed State可以通过 Key Groups 进行管理，主要用于
当算子并行度发生变化时，自动重新分布Keyed State数据。在系统运行过程中，一个Keyed算子实例
可能运行一个或者多个Key Groups的keys。
   2).Operator State
   与 Keyed State不同的是， Operator State只和并行的算子实例绑定，和数据元素中的key无关，每个
算子实例中持有所有数据元素中的一部分状态数据。Operator State支持当算子实例并行度发生变化时
自动重新分配状态数据。
   
   同时在 Flink中 Keyed State和 Operator State均具有两种形式，其中一种为托管状态(Managed
State)形式，由 Flink Runtime中控制和管理状态数据，并将状态数据转换成为内存 Hash tables或
ROCKSDB的对象存储，然后将这些状态数据通过内部的接口持久化到 Checkpoints 中，任务异常时可
以通过这些状态数据恢复任务。另外一种是原生状态（Raw State）形式，由算子自己管理数据结构，
当触发 Checkpoint过程中， Flink并不知道状态数据内部的数据结构，只是将数据转换成bys数据存储
在 Checkpoints中，当从Checkpoints恢复任务时，算子自己再反序列化出状态的数据结构。
   Datastream API支持使用 Managed State和 Raw State两种状态形式，在 Flink中推荐用户使用
Managed State管理状态数据，主要原因是 Managed State 能够更好地支持状态数据的重平衡以及更
加完善的内存管理。
 
 2.状态描述
   
   State 既然是暴露给用户的，那么就需要有一些属性需要指定：state 名称、val serializer、state type
info。在对应的statebackend中，会去调用对应的create方法获取到stateDescriptor中的值。Flink通
过 StateDescriptor 来定义一个状态。这是一个抽象类，内部定义了状态名称、类型、序列化器等基
础信息。与上面的状态对应,从 StateDescriptor 派生了 ValueStateDescriptor,ListStateDescriptor 
等descriptor
        ValueState getState(ValueStateDescriptor)
        ReducingState getReducingState(ReducingStateDescriptor)
        ListState getListState(ListStateDescriptor)
        FoldingState getFoldingState(FoldingStateDescriptor)
        MapState getMapState(MapStateDescriptor)

package com.lagou.state;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.List;

/**
 * 1、在flink中。做出OperatorState有两种方式：1、实现CheckpointedFunction接口 2、实现ListCheckPointed
 * 2、两个方法：initializeState/snapshotState
 * initializeState：每一个Function在最开始的实例化的时候调用，方法内,实例化状态
 * snapshotState：每次checkpoint的时候被调用，将操作的最新数据放到最新的检查点
 * 3、invoke：
 * 每来一个数据调用一次，把所有到来的数据都放到缓存器中
 * 目的是为了checkpoint的时候，从缓存器中拿取数据
 */
public class OperatorStateDemo implements SinkFunction<Tuple2<Long, Long>>, CheckpointedFunction {
    ListState<Tuple2<Long, Long>> operatorState;
    int threeshould;

    private List<Tuple2<Long, Long>> bufferedElements;

    public OperatorStateDemo(int threeshould) {
        this.threeshould = threeshould;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        System.out.println("....snapshotState");
        this.operatorState.clear();
        for (Tuple2<Long, Long> element : bufferedElements) {
            operatorState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        System.out.println("....initializeState");
        //做出一个state
        ListStateDescriptor<Tuple2<Long, Long>> operatorDemoDescriptor = new ListStateDescriptor<>(
                "operatorDemo",
                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {

                })
        );
        operatorState = context.getOperatorStateStore().getListState(operatorDemoDescriptor);
        if (context.isRestored()) {
            for (Tuple2<Long, Long> element : operatorState.get()) {
                bufferedElements.add(element);
            }
            System.out.println("...context.isRestored():true");
        }
    }

    @Override
    public void invoke(Tuple2<Long, Long> value, Context context) throws Exception {
        bufferedElements.add(value);

        if (bufferedElements.size() == threeshould) {
            for (Tuple2<Long, Long> element : bufferedElements) {
                System.out.println("...out:" + element);
            }
            bufferedElements.clear();
        }
    }
}


package com.lagou.state;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StateDemo1 {
    public static void main(String[] args) throws Exception {
        // （1,3）（1,5）（1,7）（1,4）（1,2）
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000);
        DataStreamSource<String> data = env.socketTextStream("linux121", 7777);
        SingleOutputStreamOperator<Tuple2<Long, Long>> maped = data.map(new MapFunction<String, Tuple2<Long, Long>>() {
            @Override
            public Tuple2<Long, Long> map(String value) throws Exception {
                String[] split = value.split(",");
                return new Tuple2<Long, Long>(Long.valueOf(split[0]), Long.valueOf(split[1]));
            }
        });
//        DataStreamSource<Tuple2<Long, Long>> data = env.fromElements(
//                new Tuple2(1l, 3l), new Tuple2(1l, 5l),
//                new Tuple2(1l, 7l), new Tuple2(1l, 4l),
//                new Tuple2(1l, 2l));
        KeyedStream<Tuple2<Long, Long>, Long> keyed = maped.keyBy(value -> value.f0);
        
        //按照key分组策略，对流式数据调用状态化处理
        SingleOutputStreamOperator<Tuple2<Long, Long>> flatMaped = keyed.flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
            ValueState<Tuple2<Long, Long>> sumState;

            @Override
            public void open(Configuration parameters) throws Exception {
                // 在open方法中做出state
                ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
                        "average",
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
                        }),
                        Tuple2.of(0L, 0L)
                );
                sumState = getRuntimeContext().getState(descriptor);
                super.open(parameters);
            }

            @Override
            public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
                // 在flatMap方法中更新state
                Tuple2<Long, Long> currentSum = sumState.value();
                currentSum.f0 += 1;
                currentSum.f1 += value.f1;

                sumState.update(currentSum);

                if (currentSum.f0 == 2) {
                    long avarage = currentSum.f1 / currentSum.f0;
                    out.collect(new Tuple2<>(value.f0, avarage));
                    sumState.clear();
                }

            }
        });

//        flatMaped.print();
        flatMaped.addSink(new OperatorStateDemo(5));
        env.execute();
    }
}
